1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.bootstrap.clientmanger; 12 13 import collie.net; 14 import collie.channel; 15 import kiss.event.timer.common; 16 import collie.utils.memory; 17 import kiss.util.functional; 18 import collie.exception; 19 import collie.utils.exception; 20 import collie.net.client.linklogInfo; 21 import std.exception; 22 23 public import kiss.net.TcpStream; 24 import kiss.event.task; 25 26 final class ClientManger(PipeLine) 27 { 28 alias ClientConnection = ClientLink!PipeLine; 29 alias PipeLineFactory = PipelineFactory!PipeLine; 30 alias ClientCreatorCallBack = void delegate(TcpStream); 31 alias ConnCallBack = void delegate(PipeLine); 32 alias LinkManger = TLinkManger!ConnCallBack; 33 alias LinklogInfo = LinkManger.LinklogInfo; 34 35 this(EventLoop loop) 36 { 37 _loop = loop; 38 _list = new ClientConnection(); 39 } 40 41 ~this() 42 { 43 if (_timer) 44 _timer.destroy; 45 } 46 47 void setClientCreatorCallBack(ClientCreatorCallBack cback) 48 { 49 _oncreator = cback; 50 } 51 52 void pipelineFactory(shared PipeLineFactory fac) 53 { 54 _factory = fac; 55 } 56 57 void connect(Address to, ConnCallBack cback = null) 58 { 59 LinklogInfo * tlogInfo = new LinklogInfo(); 60 tlogInfo.addr = to; 61 tlogInfo.tryCount = 0; 62 tlogInfo.cback = cback; 63 _loop.postTask(newTask((){ 64 _waitConnect.addlogInfo(tlogInfo); 65 connect(tlogInfo); 66 })); 67 } 68 69 void close() 70 { 71 auto con = _list.next; 72 _list.next = null; 73 while(con) { 74 auto tcon = con; 75 con = con.next; 76 tcon.close(); 77 } 78 } 79 80 @property tryCount(){return _tryCount;} 81 @property tryCount(uint count){_tryCount = count;} 82 83 alias heartbeatTimeOut = startTimeOut; 84 // 定时器不精确,需要小心误差 85 bool startTimeOut(uint s) 86 { 87 return getTimeWheelConfig(s); 88 } 89 90 @property EventLoop eventLoop() 91 { 92 return _loop; 93 } 94 95 protected: 96 void connect(LinklogInfo * logInfo) 97 { 98 logInfo.client = new TcpStream(_loop); 99 if(_oncreator) 100 _oncreator(logInfo.client); 101 logInfo.client.setCloseHandle(&tmpCloseCallBack); 102 logInfo.client.setConnectHandle(bind(&connectCallBack,logInfo)); 103 logInfo.client.setReadHandle(&tmpReadCallBack); 104 logInfo.client.connect(logInfo.addr); 105 } 106 107 void connectCallBack(LinklogInfo * tlogInfo,bool isconnect) nothrow @trusted 108 { 109 catchAndLogException((){ 110 import std.exception; 111 if(tlogInfo is null)return; 112 if(isconnect){ 113 scope(exit){ 114 _waitConnect.rmlogInfo(tlogInfo); 115 } 116 PipeLine pipe = null; 117 collectException(_factory.newPipeline(tlogInfo.client),pipe); 118 if(tlogInfo.cback) 119 tlogInfo.cback(pipe); 120 if(pipe is null)return; 121 ClientConnection con = new ClientConnection(this,pipe); 122 _wheel.addNewTimer(con); 123 124 con.next = _list.next; 125 if(con.next) 126 con.next.prev = con; 127 con.prev = _list; 128 _list.next = con; 129 130 con.initialize(); 131 132 } else {// 重试一次,失败就释放资源 133 tlogInfo.client = null; 134 if(tlogInfo.tryCount < _tryCount) { 135 tlogInfo.tryCount ++; 136 connect(tlogInfo); 137 }else{ 138 auto cback = tlogInfo.cback; 139 _waitConnect.rmlogInfo(tlogInfo); 140 gcFree(tlogInfo); 141 if(cback) 142 cback(null); 143 } 144 } 145 }()); 146 } 147 148 void tmpCloseCallBack() nothrow{} 149 150 void tmpReadCallBack(in ubyte[] buffer) nothrow{} 151 152 void remove(ClientConnection con) 153 { 154 con.prev.next = con.next; 155 if(con.next) 156 con.next.prev = con.prev; 157 gcFree(con); 158 } 159 160 bool getTimeWheelConfig(uint _timeOut) 161 { 162 uint whileSize;uint time; 163 if (_timeOut == 0) 164 return false; 165 if (_timeOut <= 40) 166 { 167 whileSize = 50; 168 time = _timeOut * 1000 / 50; 169 } 170 else if (_timeOut <= 120) 171 { 172 whileSize = 60; 173 time = _timeOut * 1000 / 60; 174 } 175 else if (_timeOut <= 600) 176 { 177 whileSize = 100; 178 time = _timeOut * 1000 / 100; 179 } 180 else if (_timeOut < 1000) 181 { 182 whileSize = 150; 183 time = _timeOut * 1000 / 150; 184 } 185 else 186 { 187 whileSize = 180; 188 time = _timeOut * 1000 / 180; 189 } 190 if (_timer) 191 return false; 192 _timer = new KissTimer(_loop); 193 _wheel = new TimingWheel(whileSize); 194 _timer.setTimerHandle(()nothrow{_wheel.prevWheel();}); 195 return _timer.start(time); 196 } 197 198 private: 199 //int[ClientConnection] _list; 200 ClientConnection _list; 201 LinkManger _waitConnect; 202 203 shared PipeLineFactory _factory; 204 TimingWheel _wheel; 205 KissTimer _timer; 206 EventLoop _loop; 207 208 uint _tryCount; 209 ClientCreatorCallBack _oncreator; 210 } 211 212 package: 213 214 final @trusted class ClientLink(PipeLine) : WheelTimer, PipelineManager 215 { 216 alias ConnectionManger = ClientManger!PipeLine; 217 218 pragma(inline, true) void initialize() 219 { 220 _pipe.transportActive(); 221 } 222 223 pragma(inline, true) void close() 224 { 225 _pipe.transportInactive(); 226 } 227 228 override void onTimeOut() nothrow 229 { 230 try{ 231 _pipe.timeOut(); 232 } catch (Exception e){ 233 showException(e); 234 } 235 } 236 237 override void refreshTimeout() 238 { 239 rest(); 240 } 241 242 override void deletePipeline(PipelineBase pipeline) 243 { 244 pipeline.pipelineManager = null; 245 stop(); 246 _manger.remove(this); 247 } 248 protected: 249 this(ConnectionManger manger, PipeLine pipe) 250 { 251 _manger = manger; 252 _pipe = pipe; 253 _pipe.finalize(); 254 _pipe.pipelineManager(this); 255 } 256 private: 257 this(){} 258 ClientLink!PipeLine prev; 259 ClientLink!PipeLine next; 260 private: 261 ConnectionManger _manger; 262 PipeLine _pipe; 263 string _name; 264 } 265 266 package: 267 struct TLinklogInfo(TCallBack) if(is(TCallBack == delegate)) 268 { 269 TcpStream client; 270 Address addr; 271 uint tryCount = 0; 272 TCallBack cback; 273 }